Python怎么操作MongoDB数据库 您所在的位置:网站首页 python 操作doc Python怎么操作MongoDB数据库

Python怎么操作MongoDB数据库

2023-03-26 20:14| 来源: 网络整理| 查看: 265

这篇文章主要介绍“Python怎么操作MongoDB数据库”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Python怎么操作MongoDB数据库”文章能帮助大家解决问题。

一、连接器的安装和配置

pymongo: MongoDB 官方提供的 Python 工具包。官方文档: https://pymongo.readthedocs.io/en/stable/ pip安装,命令如下:

pip install pymongo

管理 MongoDB 的主要步骤如下:

连接到 MongoDB 数据库系统

管理 MongoDB 数据库

管理 MongoDB 中的集合

管理 MongoDB 中的文档

第一步,连接 MongoDB:

# 方式一: 使用默认的配置 client = MongoClient() # 方式二: 指定主机地址和端口号 client = MongoClient('localhost', 27017) # 方式三: 使用URI连接参数 client = MongoClient('mongodb://localhost:27017/')

第二步,管理数据库:

#通过MongoClient对象来管理多个数据库获取数据库(逻辑库)对象 db = client.DATABASE_NAME db = client["DATABASE_NAME"] db = client.get_database(name=None, *args) # 查看所有的数据库(逻辑库) client.list_databases() # 删除数据库(逻辑库) client.drop_database(name_or_database, *args)

第三步,管理集合

# 通过数据库对象db来管理集合 # 获取集合对象 db = client.DATABASE_NAME db.COLLECTION_NAME client.DATABASE_NAME.COLLECTION_NAME db.get_collection(name, *args) # 查看当前数据库下的集合列表 db.list_collection_names() # 删除集合 db.drop_collection(name_or_collection, *args)

基础操作示例:

# -*- coding: utf-8 -*- # @Time    : 2023-03-17 1:47 # @Author  : AmoXiang # @File    : 1.数据库连接.py # @Software: PyCharm # @Blog    : https://blog.csdn.net/xw1680 from pymongo import MongoClient # 使用默认配置连接到数据库 # client = MongoClient() # print(client) # client.close() # 指定主机地址和端口号连接到数据库 # client = MongoClient('localhost', 27017) # 使用URI连接参数连接到数据库 client = MongoClient('mongodb://localhost:27017/') print(client) # client.close() # # # 访问数据库 # db = client.test # db = client["test"] # print(db) # db = client.get_database('test') # client.close() # print(db) # # 查看有哪些数据库 db_list = client.list_databases() # # db_list = client.list_database_names() for item in db_list:     print(item) # # 查看数据库下有哪些集合 db_test = client["test"] for item in db_test.list_collection_names():     print(item) # # # 集合对象操作 # data = db_test.students.find_one() # data = client.test.students.find_one() data = client.test.get_collection('students').find_one() print(data)

二、新增文档

说明:pymongo 在插入数据时可以将 python 的对象转换成 BSON

insert_one():插入一个文档

# 调用方法 result = db.COLLECTION_NAME.insert_one(doc) # 返回插入的文档ID result.inserted _id

insert_many():批量新增文档。调用方法:

doc_list = [doc1,doc2] result = db.COLLECTION_NAME.insert_many(doc_list)

示例:

三、查询文档

pymongo 可以将查询的结果转换成 python 中的对象

常用方法:

find_one(): 按条件查询一个文档 find(): 按条件查询多个文档 count_documents(): 统计满足条件的文档总数 aggregate(): 聚合统计 .sort(): 排序 .skip().limit(): 分页

示例代码:

# -*- coding: utf-8 -*- # @Time    : 2023-03-18 15:03 # @Author  : AmoXiang # @File    : 5.查询文档.py # @Software: PyCharm # @Blog    : https://blog.csdn.net/xw1680 from bson.objectid import ObjectId from pymongo import MongoClient, ASCENDING, DESCENDING class LearnMongoDBSearch(object):     """ MongoDB查询练习 """     def __init__(self):         self.client = MongoClient()     def search_one(self):         """ 查询一个文档 """         temp_obj = self.client.test.newdb.find_one()         print(temp_obj)         print('喜欢分数:', temp_obj['likes'])         # print('注册时间:', temp_obj['reg_date'].date())         print('姓名:', temp_obj['uname'])     def search_user_by_pk(self, pk):         obj_id = ObjectId(pk)         # user_obj = self.client.test.newdb.find_one({'_id': obj_id})         # 面向对象的方法,有代码提示         db = self.client.get_database('test')         users = db.get_collection('newdb')         user_obj = users.find_one({'_id': obj_id})         print(user_obj)     def search_many(self):         """ 查询多个文档 """         db = self.client.get_database('test')         students = db.get_collection('students')         # stu_list = students.find()         # for item in stu_list:         #     print(item)         # 查询年龄大于12岁的学生         stu_list = students.find({'age': {'$gt': 12}}, {'stu_name': 1, 'class_name': 1, 'age': 1})         for item in stu_list:             # 注意:  mongo中存储的整数转换成了浮点数             print(item)     def paginate(self, page=1, page_size=10):         """         分页处理         :param page: 当前的页         :param page_size: 每一页数据大小         :return:         """         db = self.client.get_database('test')         students = db.get_collection('students')         offset = (page - 1) * page_size         stu_list = students.find().skip(offset).limit(page_size)         return stu_list     def sort_data(self):         """ 排序 """         db = self.client.get_database('test')         grades = db.get_collection('grades')         # // 将学生的语文成绩从高到低排序         # db.grades.find({"grade.course_name": "语文"}).sort({"grade.score": -1});         #         # //将学生的语文成绩按照年龄和成绩排序         # db.grades.find({"grade.course_name": "语文"}).sort({"age": -1, "grade.score": -1});         # db.grades.find({"grade.course_name": "语文"}).sort({"grade.score": -1, "age": -1});         # 按某一列排序         # data_list = grades.find({"grade.course_name": "语文"}).sort("grade.score", DESCENDING);         # for item in data_list:         #     print(item)         # 按多列排序         data_list = grades.find({"grade.course_name": "语文"}).sort([('age', DESCENDING),("grade.score", DESCENDING),])         for item in data_list:             print(item)     def counter_students(self):         """ 统计newdb中文档总数 """         db = self.client.get_database('test')         newdb = db.get_collection('newdb')         # result = newdb.count_documents({})         result = newdb.count_documents({"uname": {"$eq": "张三"}})         print(result)     def test_aggregate(self):         """         聚合统计:及格的学生成绩         """         db = self.client.get_database('test')         grades = db.get_collection('grades')         result = grades.aggregate([             # //where             {                 '$match': {"grade.score": {'$gte': 60}}             },             # //group by             {                 '$group': {                     '_id': "$stu_no",                     'total': {'$sum': 1}                 }             },             # // having             {                 '$match': {                     'total': {'$eq': 3}                 }             }             ])         for item in result:             print(item) if __name__ == '__main__':     obj = LearnMongoDBSearch()     # obj.search_one()     # obj.search_user_by_pk('6411ee77b6170000b4003f95')     # obj.search_many()     # stu_list = obj.paginate(page=3)     # for item in stu_list:     #     print(item)     # obj.sort_data()     # obj.counter_students()     obj.test_aggregate()

四、更新文档

回顾,更新数据表达式,如下表所示:

Python怎么操作MongoDB数据库

修改一个文档,调用方法:

update_one(filter, update, *args)

替换一个文档,调用方法:

replace_one(filter, replacement, *args)

批量修改文档,调用方法:

replace_one(filter, replacement, *args)

快捷方法:

find_one_and_update(filter, update, *args)  # 修改一个文档 find_one_and_replace(filter, replacement, *args)  # 替换一个文档 # 注意返回值的不同

返回结果:

acknowledged:结果是否已经被确认modified_count:修改的文档数matched_count:满足条件的文档数raw_result:原始数据upserted_id:更新的ID (upsert=True)

示例代码:

# -*- coding: utf-8 -*- # @Time    : 2023-03-18 14:56 # @Author  : AmoXiang # @File    : 4.修改文档.py # @Software: PyCharm # @Blog    : https://blog.csdn.net/xw1680 from pymongo import MongoClient class LearnMongoDBUpdate(object):     """ MongoDB更新练习 """     def __init__(self):         self.client = MongoClient()     def test_update_one(self):         """ 更新一个文档 """         db = self.client.get_database('test')         newdb = db.get_collection('newdb')         result = newdb.update_one({}, {'$set': {'likes': 70}})         print(result.modified_count)     def test_replace_one(self):         """ 替换一个文档 """         db = self.client.get_database('test')         newdb = db.get_collection('newdb')         result = newdb.replace_one({}, {'uname': '张三'})         print(result.modified_count)     def test_update_many(self):         """ 批量更新文档 """         db = self.client.get_database('test')         newdb = db.get_collection('newdb')         result = newdb.update_many({}, {'$set': {'likes': 90}})         print('修改的数量:', result.modified_count)         print('满足条件的数量:', result.matched_count)     def test_update_shortcut(self):         """ 更新文档的快捷方法 """         db = self.client.get_database('test')         newdb = db.get_collection('newdb')         # 此处返回一个文档对象         result = newdb.find_one_and_update({}, {'$set': {'sex': '未知'}})         print(result['_id']) if __name__ == '__main__':     obj = LearnMongoDBUpdate()     # obj.test_update_one()     # obj.test_replace_one()     # obj.test_update_many()     obj.test_update_shortcut()

五、删除文档

删除一个文档,调用方法如下:

result = db.COLLECTION_NAME.delete_one(filter, *args) # 返回已经删除的记录数 result.deleted_count # 删除时返回文档对象  find_one_and_delete(filter, *args)

批量删除文档,调用方法:

result = db.COLLECTION_NAME.delete_many(filter, *args) # 返回已经删除的记录数 result. deleted_count

示例代码:

# -*- coding: utf-8 -*- # @Time    : 2023-03-18 14:34 # @Author  : AmoXiang # @File    : 3.删除文档.py # @Software: PyCharm # @Blog    : https://blog.csdn.net/xw1680 from pymongo import MongoClient class LearnMongoDBDelete(object):     """ MongoDB删除练习 """     def __init__(self):         self.client = MongoClient()     def test_delete_one(self):         """ 删除一个文档 """         db = self.client.get_database('test')         newdb = db.get_collection('newdb')         result = newdb.delete_one({})         print(result.deleted_count)     def test_delete_many(self):         """ 批量删除文档 """         db = self.client.get_database('test')         users = db.get_collection('newdb')         # 删除所有的数据         result = users.delete_many({"likes": {"$lte": 90}})         print(result.deleted_count)     def test_delete_shortcut(self):         """ 删除文档的快捷方法 """         db = self.client.get_database('test')         newdb = db.get_collection('newdb')         result = newdb.find_one_and_delete({})         print(result['title']) if __name__ == '__main__':     obj = LearnMongoDBDelete()     # obj.test_delete_one()     # obj.test_delete_shortcut()     obj.test_delete_many() 转载请注明:Python怎么操作MongoDB数据库 | 李雷博客 - PHP博客


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有